package com.shujia.flink.core

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimerService
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object Demo6Fraud {
  def main(args: Array[String]): Unit = {

    /*
1001,123
1001,23
1001,1
1001,1000
1001,300
1002,2
1002,0.5
1002,200
1002,0.5
1002,6000

     */

    /**
      * 对于一个账户，如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易，就输出一个报警信息。
      *
      */


    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val eventDS: DataStream[String] = env.socketTextStream("master", 8888)

    val kvDS: DataStream[(String, Double)] = eventDS.map(line => {
      val split: Array[String] = line.split(",")
      (split(0), split(1).toDouble)
    })


    //按照用户分组
    val keyByDS: KeyedStream[(String, Double), String] = kvDS.keyBy(_._1)

    /**
      * 进行欺诈检查
      * 返回结果
      * 用户编号，前一次的金额，后一次的金额
      *
      * 如果出现小于 $1 美元的交易后紧跟着一个大于 $500 的交易，就输出一个报警信息。
      *
      * 两次行为必须在一分钟内完成才发出报警
      *
      */

    val filterDS: DataStream[(String, Double, Double)] = keyByDS.process(new KeyedProcessFunction[String, (String, Double), (String, Double, Double)] {


      /**
        * 用来保存金额小于1的数据
        *
        */
      var valueState: ValueState[Double] = _

      override def open(parameters: Configuration): Unit = {

        val context: RuntimeContext = getRuntimeContext

        valueState = context.getState(new ValueStateDescriptor[Double]("money", classOf[Double]))
      }

      override def processElement(
                                   value: (String, Double),
                                   ctx: KeyedProcessFunction[String, (String, Double), (String, Double, Double)]#Context,
                                   out: Collector[(String, Double, Double)]): Unit = {

        val (id, money) = value

        //获取上一次的金额
        val lastMoney: Double = valueState.value()

        if (lastMoney != 0) {
          if (money > 500) {
            //发出一个报警信息。
            out.collect((id, lastMoney, money))
          } else {
            valueState.update(0)
          }
        }

        if (money < 1.0) {
          //将当前的金额保存到状态中
          valueState.update(money)


          println("注册定时器")
          //获取定时器对象
          val timerService: TimerService = ctx.timerService()
          //获取当前处理事件
          val currTime: Long = timerService.currentProcessingTime()

          //注册定时器
          /**
            *
            * 同时只能有一个定时器，按照最新的为准
            *
            * 当时间达到指定时间之后会触发onTimer方法的执行
            *
            */
          //一分钟之后触发onTimer执行
          timerService.registerProcessingTimeTimer(currTime + 60000)

        }
      }

      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Double), (String, Double, Double)]#OnTimerContext, out: Collector[(String, Double, Double)]): Unit = {
        println("onTimer")
        //恢复到初始状态
        valueState.update(0)
      }
    })


    filterDS.print()

    env.execute()


  }

}
